package com.lianda.example.hot.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * 去重
 */
public class DeduplicationMain {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // 取数据的DDL
        String sourceDDL = "create table source_kafka "
                + "( "
                + "    userID String, "
                + "    eventType String, "
                + "    eventTime String, "
                + "    productID String, "
                + "    proctime AS PROCTIME() " //处理时间
                + ") with ( "
                + "    'connector.type' = 'kafka', "
                + "    'connector.version' = '0.11', "
                + "    'connector.properties.bootstrap.servers' = 'localhost:9092', "
                + "    'connector.properties.zookeeper.connect' = 'localhost:2181',   "
                + "    'connector.topic' = 'source_user_action', "
                + "    'connector.properties.group.id' = 'sql_test', "
                + "    'connector.startup-mode' = 'latest-offset', "
                + "    'format.type' = 'json' "
                + ")";
        tableEnv.sqlUpdate(sourceDDL);

        //查询
        String execSQL = ""
                + "SELECT userID, eventType, eventTime, productID  "
                + "FROM ( "
                + "  SELECT *, "
                + "     ROW_NUMBER() OVER (PARTITION BY userID, eventType, eventTime ORDER BY proctime DESC) AS rownum "
                + "  FROM source_kafka "
                + ") t "
                + "WHERE rownum = 1";

        Table resultTable = tableEnv.sqlQuery(execSQL);   tableEnv.toRetractStream(resultTable, Row.class).print();
        tableEnv.execute(DeduplicationMain.class.getSimpleName());

    }
}
